Skip to main content

Scala

Data Definition Language (DDL)

OpenHouse tables supports Apache Iceberg as the underlying table format. You can use native Spark syntax to create, alter, and drop tables, but do note there are some constraints OpenHouse imposes.

For DDLs such as CREATE TABLE, ALTER TABLE, GRANT/REVOKE etc; use .sql() in SparkSession. You can also use native Spark Scala syntax that creates table if not exists.

import org.apache.spark.sql.functions;

// Your code preparing DataFrame

df.writeTo("openhouse.<dbName>.<tableName>").create()

SQL DDL commands

Reads

To query a table, run the following:

val df = spark.table("openhouse.db.table")

You can also filter data using custom filters as follows:

val filtered_df = df.filter(col("datepartition") > "2022-05-10")

With Time-Travel

Identify older snapshot you want to read, by Inspecting Metadata. Then run the following:

spark.read.option("snapshot-id", 1031031135387001532L).table("openhouse.db.table")

Writes

We highly recommend users adopt Apache Spark’s new DataFrameWriterV2 API in Spark 3 when programming with DataFrame API.

The following are example Scala statements in Spark 3 to write to a partitioned table.

Create Table

import org.apache.spark.sql.functions;

// Your code preparing DataFrame

df.writeTo("openhouse.db.table").create()

Create Partitioned Table

import org.apache.spark.sql.functions;

// Your code to create a table through existing data frame.

df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.partitionedBy(functions.col("datepartition"))
.create()

Append Data to Partitioned Table

append_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.append()

Overwrite Data

// You can dynamically overwrite partitions, which means
// any partitions with at least one row matched will be overwritten.
// To make overwritePartitions work for table-overwrite:

overwrite_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.overwritePartitions()


// To explicitly overwrite, use the following
overwrite_df.sortWithinPartitions("datepartition")
.writeTo("openhouse.db.table")
.overwrite($"level" === "INFO")
note

Note that explicit sort is necessary in partition-write because Spark doesn’t allow Iceberg to request a sort before writing as of Spark 3.0. See more at link.